package flinkstudy.stream.source.mock;

import flinkstudy.bo.EstateInfo;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 动态无限数据
 *
 * @author daocr
 * @date 2019/12/14
 */
public class DynamicUnlimitedData implements SourceFunction<EstateInfo> {

    private boolean stop = false;

    @Override
    public void run(SourceContext<EstateInfo> ctx) throws Exception {

        ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();

        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        while (true) {

            if (stop) {
                return;
            }

            EstateInfo mock = new EstateInfo();
            mock.setCityId(threadLocalRandom.nextInt(10, 15));
            mock.setDistrictId(threadLocalRandom.nextInt(10, 100));
            mock.setHouseId(threadLocalRandom.nextInt(10, 100));
            mock.setPlateId(threadLocalRandom.nextInt(10, 100));
            mock.setPrice(threadLocalRandom.nextLong(1000000, 30000000));
            Date now = new Date();
            mock.setCreateTimeFormat(simpleDateFormat.format(now));
            mock.setCreateTime(now);
            ctx.collect(mock);

            Thread.sleep(1000);
        }

    }

    @Override
    public void cancel() {
        stop = true;
    }
}
